package com.cyou.weplay.service.test.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/**
 * KafkaConsumer
 * @author zhaomingyu
 */
public class KafkaConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("group.id", "zmy-consumers-001");
        props.put("consumer.id", "true");
        props.put("zookeeper.connect", "10.127.131.22:2181");
        /*
        props.put("auto.commit.enable", "true");
        props.put("auto.commit.enable", "true");
                */
        ConsumerConfig config = new ConsumerConfig(props);
        ConsumerConnector cc = Consumer.createJavaConsumerConnector(config);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        String topic = "topic2";
        topicCountMap.put(topic, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = cc.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        KafkaStream<byte[], byte[]> stream = streams.get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while(it.hasNext()) {
            System.out.println(new String(it.next().message()));
        }
    }
}
